/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.tinkerpop.gremlin.structure.io.graphson;

import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
import org.apache.tinkerpop.gremlin.structure.io.GraphWriter;
import org.apache.tinkerpop.gremlin.structure.io.Mapper;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.Host;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraphGraphSONDeserializer;
import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
import org.apache.tinkerpop.shaded.jackson.databind.node.JsonNodeType;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Stream;

/**
 * A @{link GraphReader} that constructs a graph from a JSON-based representation of a graph and its elements.
 * This implementation only supports JSON data types and is therefore lossy with respect to data types (e.g. a
 * float will become a double, element IDs may not be retrieved in the format they were serialized, etc.).
 * {@link Edge} and {@link Vertex} objects are serialized to {@code Map} instances.  If an
 * {@link Element} is used as a key, it is coerced to its identifier.  Other complex  objects are converted via
 * {@link Object#toString()} unless there is a mapper serializer supplied.
 *
 * @author Stephen Mallette (http://stephen.genoprime.com)
 */
public final class GraphSONReader implements GraphReader {
    private final ObjectMapper mapper;
    private final long batchSize;
    private final GraphSONVersion version;
    private boolean unwrapAdjacencyList = false;

    final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {};
    final TypeReference<LinkedHashMap<String, Object>> linkedHashMapTypeReference = new TypeReference<LinkedHashMap<String, Object>>() {};

    private GraphSONReader(final Builder builder) {
        mapper = builder.mapper.createMapper();
        batchSize = builder.batchSize;
        unwrapAdjacencyList = builder.unwrapAdjacencyList;
        version = ((GraphSONMapper)builder.mapper).getVersion();
    }

    /**
     * Read data into a {@link Graph} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying
     *                    {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
     * @param graphToWriteTo the graph to write to when reading from the stream.
     */
    @Override
    public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException {
        // dual pass - create all vertices and store to cache the ids.  then create edges.  as long as we don't
        // have vertex labels in the output we can't do this single pass
        final Map<StarGraph.StarVertex,Vertex> cache = new HashMap<>();
        final AtomicLong counter = new AtomicLong(0);

        final boolean supportsTx = graphToWriteTo.features().graph().supportsTransactions();
        final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge();

        readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new ByteArrayInputStream(line.getBytes()), null, null, Direction.IN))).forEach(vertex -> {
            final Attachable<Vertex> attachable = (Attachable<Vertex>) vertex;
            cache.put((StarGraph.StarVertex) attachable.get(), attachable.attach(Attachable.Method.create(graphToWriteTo)));
            if (supportsTx && counter.incrementAndGet() % batchSize == 0)
                graphToWriteTo.tx().commit();
        });
        cache.entrySet().forEach(kv -> kv.getKey().edges(Direction.IN).forEachRemaining(e -> {
            // can't use a standard Attachable attach method here because we have to use the cache for those
            // graphs that don't support userSuppliedIds on edges.  note that outVertex/inVertex methods return
            // StarAdjacentVertex whose equality should match StarVertex.
            final Vertex cachedOutV = cache.get(e.outVertex());
            final Vertex cachedInV = cache.get(e.inVertex());

            if (null == cachedOutV) throw new IllegalStateException(String.format("Could not find outV with id [%s] to create edge with id [%s]", e.outVertex().id(), e.id()));
            if (null == cachedInV) throw new IllegalStateException(String.format("Could not find inV with id [%s] to create edge with id [%s]", e.inVertex().id(), e.id()));

            final Edge newEdge = edgeFeatures.willAllowId(e.id()) ? cachedOutV.addEdge(e.label(), cachedInV, T.id, e.id()) : cachedOutV.addEdge(e.label(), cachedInV);
            e.properties().forEachRemaining(p -> newEdge.property(p.key(), p.value()));
            if (supportsTx && counter.incrementAndGet() % batchSize == 0)
                graphToWriteTo.tx().commit();
        }));

        if (supportsTx) graphToWriteTo.tx().commit();
    }

    /**
     * Read {@link Vertex} objects from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
     * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
     * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
     * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
     */
    @Override
    public Iterator<Vertex> readVertices(final InputStream inputStream,
                                         final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
                                         final Function<Attachable<Edge>, Edge> edgeAttachMethod,
                                         final Direction attachEdgesOfThisDirection) throws IOException {
        return readVertexStrings(inputStream).<Vertex>map(FunctionUtils.wrapFunction(line -> readVertex(new ByteArrayInputStream(line.getBytes()), vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection))).iterator();
    }

    /**
     * Read a {@link Vertex}  from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream a stream containing at least a single vertex as defined by the accompanying
     *                    {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
     * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
     */
    @Override
    public Vertex readVertex(final InputStream inputStream, final Function<Attachable<Vertex>, Vertex> vertexAttachMethod) throws IOException {
        return readVertex(inputStream, vertexAttachMethod, null, null);
    }

    /**
     * Read a {@link Vertex} from output generated by any of the {@link GraphSONWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GraphSONWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying
     *                    {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
     * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
     * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
     * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
     */
    @Override
    public Vertex readVertex(final InputStream inputStream,
                             final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
                             final Function<Attachable<Edge>, Edge> edgeAttachMethod,
                             final Direction attachEdgesOfThisDirection) throws IOException {
        // graphson v3 has special handling for generic Map instances, by forcing to linkedhashmap (which is probably
        // what it should have been anyway) stargraph format can remain unchanged across all versions
        final Map<String, Object> vertexData = version == GraphSONVersion.V3_0 ?
                mapper.readValue(inputStream, linkedHashMapTypeReference) : mapper.readValue(inputStream, mapTypeReference);
        final StarGraph starGraph = StarGraphGraphSONDeserializer.readStarGraphVertex(vertexData);
        if (vertexAttachMethod != null) vertexAttachMethod.apply(starGraph.getStarVertex());

        if (vertexData.containsKey(GraphSONTokens.OUT_E) && (attachEdgesOfThisDirection == Direction.BOTH || attachEdgesOfThisDirection == Direction.OUT))
            StarGraphGraphSONDeserializer.readStarGraphEdges(edgeAttachMethod, starGraph, vertexData, GraphSONTokens.OUT_E);

        if (vertexData.containsKey(GraphSONTokens.IN_E) && (attachEdgesOfThisDirection == Direction.BOTH || attachEdgesOfThisDirection == Direction.IN))
            StarGraphGraphSONDeserializer.readStarGraphEdges(edgeAttachMethod, starGraph, vertexData, GraphSONTokens.IN_E);

        return starGraph.getStarVertex();
    }

    /**
     * Read an {@link Edge} from output generated by {@link GraphSONWriter#writeEdge(OutputStream, Edge)} or via
     * an {@link Edge} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
     *
     * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying
     *                    {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
     * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
     */
    @Override
    public Edge readEdge(final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod) throws IOException {
        if (version == GraphSONVersion.V1_0) {
            final Map<String, Object> edgeData = mapper.readValue(inputStream, mapTypeReference);

            final Map<String, Object> edgeProperties = edgeData.containsKey(GraphSONTokens.PROPERTIES) ?
                    (Map<String, Object>) edgeData.get(GraphSONTokens.PROPERTIES) : Collections.EMPTY_MAP;
            final DetachedEdge edge = new DetachedEdge(edgeData.get(GraphSONTokens.ID),
                    edgeData.get(GraphSONTokens.LABEL).toString(),
                    edgeProperties,
                    edgeData.get(GraphSONTokens.OUT), edgeData.get(GraphSONTokens.OUT_LABEL).toString(),
                    edgeData.get(GraphSONTokens.IN), edgeData.get(GraphSONTokens.IN_LABEL).toString());

            return edgeAttachMethod.apply(edge);
        } else {
            return edgeAttachMethod.apply((DetachedEdge) mapper.readValue(inputStream, Edge.class));
        }
    }

    /**
     * Read a {@link VertexProperty} from output generated by
     * {@link GraphSONWriter#writeVertexProperty(OutputStream, VertexProperty)} or via an {@link VertexProperty} passed
     * to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
     *
     * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying
     *                    {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
     * @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
     *                                   {@link Host} object.
     */
    @Override
    public VertexProperty readVertexProperty(final InputStream inputStream,
                                             final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException {
        if (version == GraphSONVersion.V1_0) {
            final Map<String, Object> vpData = mapper.readValue(inputStream, mapTypeReference);
            final Map<String, Object> metaProperties = (Map<String, Object>) vpData.get(GraphSONTokens.PROPERTIES);
            final DetachedVertexProperty vp = new DetachedVertexProperty(vpData.get(GraphSONTokens.ID),
                    vpData.get(GraphSONTokens.LABEL).toString(),
                    vpData.get(GraphSONTokens.VALUE), metaProperties);
            return vertexPropertyAttachMethod.apply(vp);
        } else {
            return vertexPropertyAttachMethod.apply((DetachedVertexProperty) mapper.readValue(inputStream, VertexProperty.class));
        }
    }

    /**
     * Read a {@link Property} from output generated by  {@link GraphSONWriter#writeProperty(OutputStream, Property)} or
     * via an {@link Property} passed to {@link GraphSONWriter#writeObject(OutputStream, Object)}.
     *
     * @param inputStream a stream containing at least one {@link Property} as written by the accompanying
     *                    {@link GraphWriter#writeProperty(OutputStream, Property)} method.
     * @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
     */
    @Override
    public Property readProperty(final InputStream inputStream,
                                 final Function<Attachable<Property>, Property> propertyAttachMethod) throws IOException {
        if (version == GraphSONVersion.V1_0) {
            final Map<String, Object> propertyData = mapper.readValue(inputStream, mapTypeReference);
            final DetachedProperty p = new DetachedProperty(propertyData.get(GraphSONTokens.KEY).toString(), propertyData.get(GraphSONTokens.VALUE));
            return propertyAttachMethod.apply(p);
        } else {
            return propertyAttachMethod.apply((DetachedProperty) mapper.readValue(inputStream, Property.class));
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException {
        return mapper.readValue(inputStream, clazz);
    }

    private Stream<String> readVertexStrings(final InputStream inputStream) throws IOException {
        if (unwrapAdjacencyList) {
            final JsonNode root = mapper.readTree(inputStream);
            final JsonNode vertices = root.get(GraphSONTokens.VERTICES);
            if (!vertices.getNodeType().equals(JsonNodeType.ARRAY)) throw new IOException(String.format("The '%s' key must be an array", GraphSONTokens.VERTICES));
            return IteratorUtils.stream(vertices.elements()).map(Object::toString);
        } else {
            final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
            return br.lines();
        }
    }

    public static Builder build() {
        return new Builder();
    }

    public final static class Builder implements ReaderBuilder<GraphSONReader> {
        private long batchSize = 10000;

        private Mapper<ObjectMapper> mapper = GraphSONMapper.build().create();
        private boolean unwrapAdjacencyList = false;

        private Builder() {}

        /**
         * Number of mutations to perform before a commit is executed when using
         * {@link GraphSONReader#readGraph(InputStream, Graph)}.
         */
        public Builder batchSize(final long batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        /**
         * Override all of the {@link GraphSONMapper} builder
         * options with this mapper.  If this value is set to something other than null then that value will be
         * used to construct the writer.
         */
        public Builder mapper(final Mapper<ObjectMapper> mapper) {
            this.mapper = mapper;
            return this;
        }

        /**
         * If the adjacency list is wrapped in a JSON object, as is done when writing a graph with
         * {@link GraphSONWriter.Builder#wrapAdjacencyList(boolean)} wrapAdjacencyList} set to {@code true}, this
         * setting needs to be set to {@code true} to properly read it.  By default, this value is {@code false} and
         * the adjacency list is simply read as line delimited vertices.
         * <p/>
         * By setting this value to {@code true}, the generated JSON is no longer "splittable" by line and thus not
         * suitable for OLAP processing.  Furthermore, reading this format of the JSON with
         * {@link GraphSONReader#readGraph(InputStream, Graph)} or
         * {@link GraphSONReader#readVertices(InputStream, Function, Function, Direction)} requires that the
         * entire JSON object be read into memory, so it is best saved for "small" graphs.
         */
        public Builder unwrapAdjacencyList(final boolean unwrapAdjacencyList) {
            this.unwrapAdjacencyList = unwrapAdjacencyList;
            return this;
        }

        public GraphSONReader create() {
            return new GraphSONReader(this);
        }
    }
}
